跳到主要内容

RocketMQ 学习04 过滤消息、事务消息

过滤消息

Tag 过滤

在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择想要的消息。

消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用 SQL 表达式筛选消息,例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

SQL 特性可以通过发送消息时的属性来进行计算。在 RocketMQ 定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

MQ 的 SQL 语法

上面说到 RocketMQ 定义了一些基本的 SQL 语法来支持这个特性。

* 数值比较,比如:>,>=,<,<=,BETWEEN,=;
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;

常量支持类型为:

* 数值,比如:123,3.1415;
* 字符,比如:`'abc'`,必须用单引号包裹起来;
* NULL,特殊的常量
* 布尔值,TRUE 或 FALSE

只有使用 push 模式的消费者才能用使用这个 sql 语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

实现消息生产者

发送消息时,能通过 putUserProperty 来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
msg.putUserProperty("a", "2");
SendResult sendResult = producer.send(msg);

producer.shutdown();

实现消息消费者

MessageSelector.bySql 来使用 sql 筛选消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性 a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

事务消息

事务消息是什么?

事务消息:消息队列 RocketMQ 版提供类似 XA 或 Open XA 的分布式事务功能,通过消息队列 RocketMQ 事务消息能达到分布式事务的最终一致。

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成 “暂不能投递” 状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于 “半事务消息” 时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

分布式事务消息的优势

消息队列 RocketMQ 分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。

同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。

在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。(其实这里在 InnoDB 引擎那本书里提到过)

流程分析

事务消息交互流程如下图所示。

事务消息发送步骤如下:

1、发送方将半事务消息发送至消息队列 RocketMQ 服务端。 2、消息队列 RocketMQ 服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。 3、发送方开始执行本地事务逻辑。 4、发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

1、在断网或者是应用重启的特殊情况下,上述步骤4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。 2、发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3、发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。 TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown:中间状态,它代表需要检查消息队列来确定状态。

创建事务性生产者

使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。

public class Producer {

public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("group5");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.211.151:9876;192.168.211.152:9876");

//添加事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}

/**
* 该方法时MQ进行消息事务状态回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("消息的Tag:" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});

//3.启动producer
producer.start();

String[] tags = {"TAGA", "TAGB", "TAGC"};

for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
//5.发送消息
SendResult result = producer.sendMessageInTransaction(msg, null);
//发送状态
SendStatus status = result.getSendStatus();

System.out.println("发送结果:" + result);

//线程睡1秒
TimeUnit.SECONDS.sleep(2);
}

//6.关闭生产者producer
//producer.shutdown();
}
}

实现事务的监听接口

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。

它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。